{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# How to use callbacks in async environments\n",
    "\n",
    ":::info Prerequisites\n",
    "\n",
    "This guide assumes familiarity with the following concepts:\n",
    "\n",
    "- [Callbacks](/docs/concepts/#callbacks)\n",
    "- [Custom callback handlers](/docs/how_to/custom_callbacks)\n",
    ":::\n",
    "\n",
    "If you are planning to use the async APIs, it is recommended to use and extend [`AsyncCallbackHandler`](https://api.python.langchain.com/en/latest/callbacks/langchain_core.callbacks.base.AsyncCallbackHandler.html) to avoid blocking the event.\n",
    "\n",
    "\n",
    ":::{.callout-warning}\n",
    "If you use a sync `CallbackHandler` while using an async method to run your LLM / Chain / Tool / Agent, it will still work. However, under the hood, it will be called with [`run_in_executor`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) which can cause issues if your `CallbackHandler` is not thread-safe.\n",
    ":::\n",
    "\n",
    ":::{.callout-danger}\n",
    "\n",
    "If you're on `python<=3.10`, you need to remember to propagate `config` or `callbacks` when invoking other `runnable` from within a `RunnableLambda`, `RunnableGenerator` or `@tool`. If you do not do this,\n",
    "the callbacks will not be propagated to the child runnables being invoked.\n",
    ":::"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# | output: false\n",
    "# | echo: false\n",
    "\n",
    "%pip install -qU langchain langchain_anthropic\n",
    "\n",
    "import getpass\n",
    "import os\n",
    "\n",
    "os.environ[\"ANTHROPIC_API_KEY\"] = getpass.getpass()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "zzzz....\n",
      "Hi! I just woke up. Your llm is starting\n",
      "Sync handler being called in a `thread_pool_executor`: token: Here\n",
      "Sync handler being called in a `thread_pool_executor`: token: 's\n",
      "Sync handler being called in a `thread_pool_executor`: token:  a\n",
      "Sync handler being called in a `thread_pool_executor`: token:  little\n",
      "Sync handler being called in a `thread_pool_executor`: token:  joke\n",
      "Sync handler being called in a `thread_pool_executor`: token:  for\n",
      "Sync handler being called in a `thread_pool_executor`: token:  you\n",
      "Sync handler being called in a `thread_pool_executor`: token: :\n",
      "Sync handler being called in a `thread_pool_executor`: token: \n",
      "\n",
      "Why\n",
      "Sync handler being called in a `thread_pool_executor`: token:  can\n",
      "Sync handler being called in a `thread_pool_executor`: token: 't\n",
      "Sync handler being called in a `thread_pool_executor`: token:  a\n",
      "Sync handler being called in a `thread_pool_executor`: token:  bicycle\n",
      "Sync handler being called in a `thread_pool_executor`: token:  stan\n",
      "Sync handler being called in a `thread_pool_executor`: token: d up\n",
      "Sync handler being called in a `thread_pool_executor`: token:  by\n",
      "Sync handler being called in a `thread_pool_executor`: token:  itself\n",
      "Sync handler being called in a `thread_pool_executor`: token: ?\n",
      "Sync handler being called in a `thread_pool_executor`: token:  Because\n",
      "Sync handler being called in a `thread_pool_executor`: token:  it\n",
      "Sync handler being called in a `thread_pool_executor`: token: 's\n",
      "Sync handler being called in a `thread_pool_executor`: token:  two\n",
      "Sync handler being called in a `thread_pool_executor`: token: -\n",
      "Sync handler being called in a `thread_pool_executor`: token: tire\n",
      "zzzz....\n",
      "Hi! I just woke up. Your llm is ending\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "LLMResult(generations=[[ChatGeneration(text=\"Here's a little joke for you:\\n\\nWhy can't a bicycle stand up by itself? Because it's two-tire\", message=AIMessage(content=\"Here's a little joke for you:\\n\\nWhy can't a bicycle stand up by itself? Because it's two-tire\", id='run-8afc89e8-02c0-4522-8480-d96977240bd4-0'))]], llm_output={}, run=[RunInfo(run_id=UUID('8afc89e8-02c0-4522-8480-d96977240bd4'))])"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import asyncio\n",
    "from typing import Any, Dict, List\n",
    "\n",
    "from langchain_anthropic import ChatAnthropic\n",
    "from langchain_core.callbacks import AsyncCallbackHandler, BaseCallbackHandler\n",
    "from langchain_core.messages import HumanMessage\n",
    "from langchain_core.outputs import LLMResult\n",
    "\n",
    "\n",
    "class MyCustomSyncHandler(BaseCallbackHandler):\n",
    "    def on_llm_new_token(self, token: str, **kwargs) -> None:\n",
    "        print(f\"Sync handler being called in a `thread_pool_executor`: token: {token}\")\n",
    "\n",
    "\n",
    "class MyCustomAsyncHandler(AsyncCallbackHandler):\n",
    "    \"\"\"Async callback handler that can be used to handle callbacks from langchain.\"\"\"\n",
    "\n",
    "    async def on_llm_start(\n",
    "        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any\n",
    "    ) -> None:\n",
    "        \"\"\"Run when chain starts running.\"\"\"\n",
    "        print(\"zzzz....\")\n",
    "        await asyncio.sleep(0.3)\n",
    "        class_name = serialized[\"name\"]\n",
    "        print(\"Hi! I just woke up. Your llm is starting\")\n",
    "\n",
    "    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:\n",
    "        \"\"\"Run when chain ends running.\"\"\"\n",
    "        print(\"zzzz....\")\n",
    "        await asyncio.sleep(0.3)\n",
    "        print(\"Hi! I just woke up. Your llm is ending\")\n",
    "\n",
    "\n",
    "# To enable streaming, we pass in `streaming=True` to the ChatModel constructor\n",
    "# Additionally, we pass in a list with our custom handler\n",
    "chat = ChatAnthropic(\n",
    "    model=\"claude-3-sonnet-20240229\",\n",
    "    max_tokens=25,\n",
    "    streaming=True,\n",
    "    callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],\n",
    ")\n",
    "\n",
    "await chat.agenerate([[HumanMessage(content=\"Tell me a joke\")]])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Next steps\n",
    "\n",
    "You've now learned how to create your own custom callback handlers.\n",
    "\n",
    "Next, check out the other how-to guides in this section, such as [how to attach callbacks to a runnable](/docs/how_to/callbacks_attach)."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
